../_images/logo_orange.svg

Analyser loop

Goal

The goal of this algorithm is to determine the average derivatives and to predict the date (day and time) of the next actuator change for each sensor.

How it works

There are many steps used to calculate the date of the next actuator change step:

../_images/gif_scheme_graph_filter.gif
Steps:
  • Step 0 : Get the sensor’s measure history since last actuator action

  • Step 1 : Transform to monotonic data

  • Step 2 : Remove constant values at the beginning

  • Step 3 : Fit a custom function to remaining data points

  • Step 4 : Extend the function until it crosses the next objective value

  • Step 5 : Get cross time

  • Step 6 : Compare with last data’s date

Get the sensor’s measure history since last actuator action

../_images/scheme_graph_filter_step0.svg

An SQL request gets all the measurements associated with a sensor since the last action of an actuator that influenced the sensor.

1
2
3
4
5
6
SELECT value
FROM measure
WHERE
    id_sensor = '{0}' AND
    timeDate > '{1}'
ORDER BY timeDate ASC;

Transform to monotonic data

../_images/scheme_graph_filter_step1.svg

The algorithm must fit its function to monotonic data only.

This implies that the algorithm should discard all data recorded before the last change in the sign of the derivative.

This is achieved with a double python while loop.

Remove constant values at the beginning

../_images/scheme_graph_filter_step2.svg

When data is ordered in ascending order (ASC), this step removes all repeating data at the beginning. This improves the precision of the final estimation by an average of 20%.

In effect, the algorithm tries to fit a curve to a list of two-dimensional data points, removing constant measurements at the beginning improves the fit.

This functionality is integrated in the double while loop of Step 2

Fit a custom function to remaining data points

../_images/scheme_graph_filter_step3.svg

The algorithm now fits the following function to the remaining measurements by changing parameters a, b, and c. This is done with the function scipy.optimize.curve_fit of scipy.

\[f(x, a, b, c) = a + b \times x^{min(1,c)}\]
Curve

If the system is far from the objective, the curve is concave and the estimation has to be a line. When the system is close to the objective, the curve is convex and the estimation is perfect.

Initialisation

The initialisation of coefficients is crucial to find the right solution and reduce the calculation time.

  • a is the offset and it is initialised at the value of the last measure.

  • b is initialised at 1.

  • c is initialised between 0.1 and 1 and gradually increased by 10% each iteration.

Extend the function until it crosses the next objective value

../_images/scheme_graph_filter_step4.svg

Once the function has been generated, it is extended along the X axis to predict the future sensors measurements.

Step 3 returns the coefficients a, b, and c from which the best fit function f may be found.

We are therefore able to calculate the value of f for future time values and check when it will reach a given value.

Get cross time

../_images/scheme_graph_filter_step5.svg

Many conditions are checked in order to avoid an infinite calculation time.

A counter that can’t go over 10000 iteration.

This implies that the maximum calculation time is 24 hours.

\[max_\mathrm{time} = \tfrac{max_\mathrm{iterations}}{FPS_\mathrm{sensor}}\]

Compare with last data’s date

../_images/scheme_graph_filter_step6.svg

Finally the returned value is simply the difference between the date of the intersection of the function with the objective and the last measurement’s date.

The returned value is a time in second, but the value stored in the database is a date. It allows to update the time to target on the UI without constantly making this computation.

The average derivative is also stored in order to give useful information to the user.

Examples

Analyser examples

graph_analyser_1

graph_analyser_2

AC - Far from target

AC - Close to target

graph_analyser_3

graph_analyser_4

HE - Far from target

HE - close to target

Loop code

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def loop():
    import time
    import db
    import pubsub
    import settings
    import variables
    import numpy as np
    import math
    import random
    import scipy.optimize
    name = __name__
    Pub = pubsub.Pubsub()  # Initialize communication object
    Db = db.DB()  # Initialize data storage object
    St = settings.Settings()  # Initialize variable storage object
    Pub.get(name)  # Initialize comminication channel __name__
    time_loop = time.time()  # Start time loop (To regulate max FPS)
    while 1:  # Infinite loop start
        time.sleep(St.get("is_raspberry") * max(0, 1 / float(variables.FPS_analyser) - (time.time() - time_loop)))  # Sleep time to not trigger the script more than the FPS ratio fixed
        time_loop = time.time()  # Start time loop (To regulate max FPS)
        if Pub.get(name) or __name__ == "__main__" or not(St.get("is_raspberry")):  # triggered or test from computer (from convergence.py or from this file)
            sensors = Db.get_sensors_id()  # Get all sensor id
            for i in range(len(sensors)):
                id_sensor = sensors[i]
                actuators_involved = Db.get_actuator__sensors_id(id_sensor)  # Get all actuators that can influence the actual loop sensor
                dates = []  # Initialize list of last action of actuators
                for j in range(len(actuators_involved)):
                    id_actuator = actuators_involved[j]
                    try:
                        dates.append(Db.get_action_date(id_actuator))  # Get the last action of the actuator
                    except IndexError:  # If actuator never triggered
                        continue
                if dates == []:  # If no actuators involved
                    continue
                try:
                    Y = Db.get_measures_date_sensor(id_sensor, max(dates))  # Get all measures of the sensor since a given date
                except IndexError:  # If no measures
                    continue
                if __name__ == "__main__":  # Test block for display datas on computer
                    Y= [25.2, 25.1, 25, 24.8, 24.6]
                # print("raw",Y)
                if len(Y)<=4:  # If not enough datas
                    continue
                while True:  # Pic and constant data filter loop 
                    breaker = True  # breaker for the while loop
                    if len(Y) <= 4:  # If not enough datas
                        break  # Stop while loop
                    sens = 0
                    for j in range(len(Y) - 1):
                        m1 = Y[j]  # measure j (before)
                        m2 = Y[j + 1]  # measure j+1 (after)
                        if m1 > m2:  # if there is a decrease
                            if sens > 0:  # if there were a precedent increase
                                Y = Y[j:]  # Data removing
                                breaker = False  # Restart for loop with new datas
                                break  # Break for loop
                            else:
                                sens = -1  # Set the way (decrease)
                        elif m1 < m2:  # if there is an increase
                            if sens < 0:  # if there were a precedent decrease
                                Y = Y[j:]  # Data removing
                                breaker = False  # Restart for loop with new datas
                                break  # Break for loop
                            else:
                                sens = +1  # Set the way (decrease)
                    if breaker:  # If monotonus datas
                        break  # Stop while loop
                # print("filtered",Y)
                if len(Y) <= 4:  # If monotonus datas
                    continue
                def f(x, a, b, c):  # Function to fit to datas
                    return a + b * x **min(1, c)  # Have a look on the documentation for the justification of this function
                dt = 1 / variables.FPS_sensor  # Step time between two measures
                X = [i * dt for i in range(len(Y))]  # Generate X axis of Y datas
                target = Db.get_sensor_target_delta(id_sensor)  # get current target of the sensor
                outer = abs(Db.get_sensor_outer(id_sensor))  # Get first hysteresis relative-to-target value
                objective = target + outer * sens  # Calcul the objective (value where an actuator will change of state)
                average = sum(Y) / len(Y)  # Calcul average
                # if sens<0 and average<objective:  # Double check for no coherent datas
                    # continue
                # elif sens>0 and average>objective:  # Double check for no coherent datas
                    # continue
                precision = 10 ** -1  # power coeficient initialisation value
                while True:  # Fit function loop
                    try:  # Try to fit F on (X,Y) datas
                        popt, pcov = scipy.optimize.curve_fit(f, X, Y,
                            (
                                Y[-1],  # Initialize a coefficient (offset) with the last measure value
                                1,  # Initialize b with 1
                                precision  # Initialize c (power) with precision value (concave curve to line)
                            )
                        )
                    except Exception as e:  # If no fit found
                        pass
                    breaker = False  # initialisation of breaker for while loop
                    Xf = [1]  # X generate result
                    Yf = []  # Y generate result
                    counter = 1  # counter to avoid infinite loop (if the function tend to a finite value without crossing objective value)
                    while True:  # Extrapolation loop
                        try:
                            Yf.append(f(Xf[-1], *popt))  # Append Y value of the last X value
                        except:
                            break
                        if Xf[-1] > X[-1]:  # If start to predict the futur
                            if (Yf[-1] - objective) * sens > 0:  # if cross the objective line
                                breaker = True  # Stop the prediction
                                break
                            counter *= 1.1  # Increase counter
                        if counter > 10 ** 5:  # if there is no cross for next 24 hours
                            break
                        Xf.append(int(Xf[-1] + counter))  # Increase of one second
                    if breaker:  # If objective has been reached
                        break
                    if precision > 1.1:  # if no Fit curve found
                        break
                    precision *= 2  # Increase of the precision
                if precision > 1.1:  # If no fit found
                    continue
                if __name__ == "__main__":  # Test on computer block
                    import matplotlib.pyplot as plt
                    plt.scatter(X, Y)  # plot measures takes for calculation
                    plt.plot(Xf, Yf)  # plot calculation
                    plt.plot(Xf, [objective for i in Xf])  # plot objective
                    plt.show()  # Display graph
                if counter > 10 ** 5:  # If no cross found
                    continue
                time_to = Xf[-1] - X[-1]  # Time calculation
                date_change=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()+time_to))  # add the time_to value
                Db.set_sensor_date_change(id_sensor, date_change)  # Store the change date in the database
                # print("time_to",time_to)
                # print("date_change",date_change)
                pente = popt[1]
                if sens > 0:
                    Db.set_sensor_derivative_up(id_sensor, pente)  # Store the derivative up
                elif sens < 0:
                    Db.set_sensor_derivative_down(id_sensor, pente)  # Store the derivative down
        if St.get("is_unittest") or __name__ == "__main__":
            break
../_images/header.svg